1.环境说明
在前面几期的课程里面讲过了 Flink 开发环境的搭建和应用的部署以及运行,今天的课程主要是讲 Flink 的客户端操作。本次讲解以实际操作为主。这次课程是基于社区的 Flink 1.7.2 版本,操作系统是 Mac 系统,浏览器是 Google Chrome 浏览器。有关开发环境的准备和集群的部署,请参考「开发环境搭建和应用的配置、部署及运行」的内容。
2.课程概要
如下图所示,Flink 提供了丰富的客户端操作来提交任务和与任务进行交互,包括 Flink 命令行,Scala Shell,SQL Client,Restful API 和 Web。Flink 首先提供的最重要的是命令行,其次是 SQL Client 用于提交 SQL 任务的运行,还有就是 Scala Shell 提交 Table API 的任务。同时,Flink 也提供了Restful 服务,用户可以通过 http 方式进行调用。此外,还有 Web 的方式可以提交任务。

在 Flink 安装目录的 bin 目录下面可以看到有 flink, start-scala-shell.sh 和 sql-client.sh 等文件,这些都是客户端操作的入口。

3.Flink 客户端操作
3.1 Flink 命令行
Flink 的命令行参数很多,输入 flink - h 能看到完整的说明:
1  | flink-1.7.2 bin/flink -h  | 
如果想看某一个命令的参数,比如 Run 命令,输入:
1  | flink-1.7.2 bin/flink run -h  | 
本文主要讲解常见的一些操作,更详细的文档请参考: Flink 命令行官方文档。
3.1.1 Standalone
首先启动一个 Standalone 的集群:
1  | flink-1.7.2 bin/start-cluster.sh  | 
打开 http://127.0.0.1:8081 能看到 Web 界面。
Run
运行任务,以 Flink 自带的例子 TopSpeedWindowing 为例:
1  | flink-1.7.2 bin/flink run -d examples/streaming/TopSpeedWindowing.jar  | 
运行起来后默认是 1 个并发:

点左侧「Task Manager」,然后点「Stdout」能看到输出日志:

或者查看本地 Log 目录下的 *.out 文件:

List
查看任务列表:
1  | flink-1.7.2 bin/flink list -m 127.0.0.1:8081  | 
Stop
停止任务。通过 -m 来指定要停止的 JobManager 的主机地址和端口。
1  | flink-1.7.2 bin/flink stop -m 127.0.0.1:8081 d67420e52bd051fae2fddbaa79e046bb  | 
从日志里面能看出 Stop 命令执行失败了。一个 Job 能够被 Stop 要求所有的 Source 都是可以 Stoppable 的,即实现了 StoppableFunction 接口。
1  | /**  | 
Cancel
取消任务。如果在 conf/flink-conf.yaml 里面配置了 state.savepoints.dir,会保存 Savepoint,否则不会保存 Savepoint。
1  | flink-1.7.2 bin/flink cancel -m 127.0.0.1:8081 5e20cb6b0f357591171dfcca2eea09de  | 
也可以在停止的时候显示指定 Savepoint 目录。
1  | flink-1.7.2 bin/flink cancel -m 127.0.0.1:8081 -s /tmp/savepoint 29da945b99dea6547c3fbafd57ed8759  | 
取消和停止(流作业)的区别如下:
- cancel() 调用,立即调用作业算子的 cancel() 方法,以尽快取消它们。如果算子在接到 cancel() 调用后没有停止,Flink 将开始定期中断算子线程的执行,直到所有算子停止为止。
 - stop() 调用,是更优雅的停止正在运行流作业的方式。stop() 仅适用于 Source 实现了 StoppableFunction 接口的作业。当用户请求停止作业时,作业的所有 Source 都将接收 stop() 方法调用。直到所有 Source 正常关闭时,作业才会正常结束。这种方式,使作业正常处理完所有作业。
 
Savepoint
触发 Savepoint。
1  | flink-1.7.2 bin/flink savepoint -m 127.0.0.1:8081 ec53edcfaeb96b2a5dadbfbe5ff62bbb /tmp/savepoint  | 
说明:Savepoint 和 Checkpoint 的区别(详见文档):
- Checkpoint 是增量做的,每次的时间较短,数据量较小,只要在程序里面启用后会自动触发,用户无须感知;Checkpoint 是作业 failover 的时候自动使用,不需要用户指定。
 - Savepoint 是全量做的,每次的时间较长,数据量较大,需要用户主动去触发。Savepoint 一般用于程序的版本更新(详见文档),Bug 修复,A/B Test 等场景,需要用户指定。
 
通过 -s 参数从指定的 Savepoint 启动:
1  | flink-1.7.2 bin/flink run -d -s /tmp/savepoint/savepoint-f049ff-24ec0d3e0dc7 ./examples/streaming/TopSpeedWindowing.jar  | 
查看 JobManager 的日志,能够看到类似这样的 Log:
1  | 2019-03-28 10:30:53,957 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator  | 
Modify
修改任务并行度。
为了方便演示,我们修改 conf/flink-conf.yaml 将 Task Slot 数从默认的 1 改为 4,并配置 Savepoint 目录。(Modify 参数后面接 -s 指定 Savepoint 路径当前版本可能有 Bug,提示无法识别)
1  | taskmanager.numberOfTaskSlots: 4  | 
修改参数后需要重启集群生效,然后再启动任务:
1  | flink-1.7.2 bin/stop-cluster.sh && bin/start-cluster.sh  | 
从页面上能看到 Task Slot 变为了 4,这时候任务的默认并发度是 1。


通过 Modify 命令依次将并发度修改为 4 和 3,可以看到每次 Modify 命令都会触发一次 Savepoint。
1  | flink-1.7.2 bin/flink modify -p 4 7752ea7b0e7303c780de9d86a5ded3fa  | 

查看 JobManager 的日志,可以看到:
1  | 2019-06-17 09:05:11,179 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Starting job 7752ea7b0e7303c780de9d86a5ded3fa from savepoint file:/tmp/savepoint/savepoint-790d7b-3581698f007e ()  | 
Info
Info 命令是用来查看 Flink 任务的执行计划(StreamGraph)的。
1  | flink-1.7.2 bin/flink info examples/streaming/TopSpeedWindowing.jar  | 
拷贝输出的 Json 内容,粘贴到这个网站:http://flink.apache.org/visualizer/

可以和实际运行的物理执行计划对比:

3.1.2 Yarn per-job
单任务 Attach 模式
默认是 Attach 模式,即客户端会一直等待直到程序结束才会退出。
- 通过 -m yarn-cluster 指定 Yarn 模式
 - Yarn 上显示名字为 Flink session cluster,这个 Batch 的 Wordcount 任务运行完会 FINISHED。
 - 客户端能看到结果输出
 
1  | [admin@z17.sqa.zth /home/admin/flink/flink-1.7.2]  | 


如果我们以 Attach 模式运行 Streaming 的任务,客户端会一直等待不退出,可以运行以下的例子试验下:
1  | ./bin/flink run -m yarn-cluster ./examples/streaming/TopSpeedWindowing.jar  | 
单任务 Detached 模式
- 由于是 Detached 模式,客户端提交完任务就退出了
 - Yarn 上显示为 Flink per-job cluster
 
1  | $./bin/flink run -yd -m yarn-cluster ./examples/streaming/TopSpeedWindowing.jar  | 


3.1.3 Yarn session
启动 Session
1  | ./bin/yarn-session.sh -tm 2048 -s 3  | 
表示启动一个 Yarn session 集群,每个 TM 的内存是 2 G,每个 TM 有 3 个 Slot。(注意:-n 参数不生效)
1  | flink-1.7.2 ./bin/yarn-session.sh -tm 2048 -s 3  | 
客户端默认是 Attach 模式,不会退出:
- 可以 ctrl + c 退出,然后再通过 ./bin/yarn-session.sh -id application_1532332183347_0726 连上来;
 - 或者启动的时候用 -d 则为 detached 模式
Yarn 上显示为 Flink session cluster; 


- 在本机的临时目录(有些机器是 /tmp 目录)下会生成一个文件:
 
1  | flink-1.7.2 cat /var/folders/2b/r6d49pcs23z43b8fqsyz885c0000gn/T/.yarn-properties-baoniu  | 
提交任务
1  | ./bin/flink run ./examples/batch/WordCount.jar  | 
将会根据 /tmp/.yarn-properties-admin 文件内容提交到了刚启动的 Session。
1  | flink-1.7.2 ./bin/flink run ./examples/batch/WordCount.jar  | 
运行结束后 TM 的资源会释放。

提交到指定的 Session
通过 -yid 参数来提交到指定的 Session。
1  | $./bin/flink run -d -p 30 -m yarn-cluster -yid application_1532332183347_0708 ./examples/streaming/TopSpeedWindowing.jar  | 

注:Blink版本 的 Session 与 Flink 的 Session 的区别:
- Flink 的 session -n 参数不生效,而且不会提前启动 TM;
 - Blink 的 session 可以通过 -n 指定启动多少个 TM,而且 TM 会提前起来;
 
3.2 Scala Shell
官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/scala_shell.html
3.2.1 Deploy
Local
1  | $bin/start-scala-shell.sh local  | 
任务运行说明:
- Batch 任务内置了 benv 变量,通过 print() 将结果输出到控制台;
 - Streaming 任务内置了 senv 变量,通过 senv.execute(“job name”) 来提交任务,且 Datastream 的输出只有在 Local 模式下打印到控制台;
 
Remote
先启动一个 yarn session cluster:
1  | $./bin/yarn-session.sh -tm 2048 -s 3  | 
启动 scala shell,连到 jm:
1  | $bin/start-scala-shell.sh remote z054.sqa.net 28665  | 
Yarn
1  | $./bin/start-scala-shell.sh yarn -n 2 -jm 1024 -s 2 -tm 1024 -nm flink-yarn  | 

按 CTRL + C 退出 Shell 后,这个 Flink cluster 还会继续运行,不会退出。
3.2.2 Execute
DataSet
1  | flink-1.7.2 bin/stop-cluster.sh  | 
对 DataSet 任务来说,print() 会触发任务的执行。

也可以将结果输出到文件(先删除 /tmp/out1,不然会报错同名文件已经存在),继续执行以下命令:
1  | scala> counts.writeAsText("/tmp/out1")  | 
查看 /tmp/out1 文件就能看到输出结果。
1  | flink-1.7.2 cat /tmp/out1  | 

DataSteam
1  | scala> val textStreaming = senv.fromElements("To be, or not to be,--that is the question:--")  | 
对 DataStream 任务,print() 并不会触发任务的执行,需要显示调用 execute(“job name”) 才会执行任务。


TableAPI
在 Blink 开源版本里面,支持了 TableAPI 方式提交任务(可以用 btenv.sqlQuery 提交 SQL 查询),社区版本 Flink 1.8 会支持 TableAPI: https://issues.apache.org/jira/browse/FLINK-9555
3.3 SQL Client Beta
SQL Client 目前还只是测试版,处于开发阶段,只能用于 SQL 的原型验证,不推荐在生产环境使用。
3.3.1 基本用法
1  | flink-1.7.2 bin/start-cluster.sh  | 
Select 查询
1  | Flink SQL> SELECT 'Hello World';  | 

按 ”Q” 退出这个界面
打开 http://127.0.0.1:8081 能看到这条 Select 语句产生的查询任务已经结束了。这个查询采用的是读取固定数据集的 Custom Source,输出用的是 Stream Collect Sink,且只输出一条结果。
注意:如果本机的临时目录存在类似 .yarn-properties-baoniu 的文件,任务会提交到 Yarn 上。


Explain
Explain 命令可以查看 SQL 的执行计划。
1  | Flink SQL> explain SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;  | 
3.3.2 结果展示
SQL Client 支持两种模式来维护并展示查询结果:
- table mode: 在内存中物化查询结果,并以分页 table 形式展示。用户可以通过以下命令启用 table mode;
 
1  | SET execution.result-mode=table  | 
- changlog mode: 不会物化查询结果,而是直接对 continuous query 产生的添加和撤回(retractions)结果进行展示。
 
1  | SET execution.result-mode=changelog  | 
接下来通过实际的例子进行演示。
Table mode
1  | Flink SQL> SET execution.result-mode=table;  | 
运行结果如下图所示:



Changlog mode
1  | Flink SQL> SET execution.result-mode=changelog;  | 
运行结果如下图所示:

其中 ‘-’ 代表的就是撤回消息。


3.3.3 Environment Files
目前的 SQL Client 还不支持 DDL 语句,只能通过 yaml 文件的方式来定义 SQL 查询需要的表,UDF 和运行参数等信息。
首先,准备 env.yaml 和 input.csv 两个文件。
1  | flink-1.7.2 cat /tmp/env.yaml  | 
启动 SQL Client:
1  | flink-1.7.2 ./bin/sql-client.sh embedded -e /tmp/env.yaml  | 



使用 insert into 写入结果表:
1  | Flink SQL> insert into MyTableSink select * from MyTableSource;  | 


查询生成的结果数据文件:
1  | flink-1.7.2 cat /tmp/output.csv  | 
也可以在 Environment 文件里面定义 UDF,在 SQL Client 里面通过 「HOW FUNCTIONS」查询和使用,这里就不再说明了。
SQL Client 功能社区还在开发中,详见 FLIP-24。
3.4 Restful API
接下来我们演示如何通过 Rest API 来提交 Jar 包和执行任务。
更详细的操作请参考 Flink 的 Restful API 文档:https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html
1  | flink-1.7.2 curl http://127.0.0.1:8081/overview  | 


Restful API 还提供了很多监控和 Metrics 相关的功能,对于任务提交的操作也支持的比较全面。
3.5 Web
在 Flink Dashboard 页面左侧可以看到有个「Submit new Job」的地方,用户可以上传 Jar 包和显示执行计划和提交任务。Web 提交功能主要用于新手入门和演示用。

4.结束
本期的课程到这里就结束了,我们主要讲解了 Flink 的 5 种任务提交的方式。熟练掌握各种任务提交方式,有利于提高我们日常的开发和运维效率。
视频回顾:https://zh.ververica.com/developers/flink-training-course2/